package UDF

import Source.SensorReading
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala._
import org.apache.flink.table.functions.TableFunction
import org.apache.flink.types.Row

/**
 * 自定义表函数
 */
object TableFunctionTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val settings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    val tableEnv = StreamTableEnvironment.create(env, settings)


    val inputPath = "src/main/resources/SensorReading"
    val inputStream = env.readTextFile(inputPath)

    //转换成样例类类型
    val dataStream = inputStream.map(
      data => {
        val arr = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      }
      //选自字段作为时间戳
    ).assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
        override def extractTimestamp(t: SensorReading) = t.timeStamp
      })
    val sensorTable = tableEnv.fromDataStream(dataStream
      , 'id, 'temperature, 'timeStamp.rowtime as 'ts)

    //Table API
    val split = new Split("_")
    val resultTable = sensorTable
      .joinLateral(split('id) as('word, 'length)) //侧向连接
      .select('id, 'ts, 'word, 'length)

    //sql
    tableEnv.createTemporaryView("sensor", sensorTable)
    tableEnv.registerFunction("split", split)
    val resultSqlTable = tableEnv.sqlQuery(
      """
        |select
        | id,ts,word,length
        |from
        | sensor,lateral table(split(id)) as splitid(word,length)
        |""".stripMargin
    )

    resultTable.toAppendStream[Row].print("table")
    resultSqlTable.toAppendStream[Row].print("sql")
    env.execute()
  }
}

/**
 * 表函数，拆词并且统计字符个数
 */
class Split(separator: String) extends TableFunction[(String, Int)] {
  def eval(s: String) = {
    s.split(separator).foreach(
      word => collect((word, word.length))
    )
  }
}